消息队列之 RabbitMQ

简介

  RabbitMQ 是采用 Erlang 语言实现 AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)的消息中间件, 它最初起源于金融系统, 用于在分布式系统中存储转发消息。

特性

  • 可靠性:RabbitMQ 使用 一些机制来保证可靠性 ,如持久化、传输确认及发布确 认等 。
  • 异步消息传递:支持多种消息传输协议、消息排队、交付确认、灵活的路由到队列、多种交换类型
  • 灵活路由:在消息进入队列之前,通过交换机来路由消息 。对于典型的路由功能,RabbitMQ 己经提供了 一些内置的交换机来实现。针对更复杂的路由功能,可以将多个交换机绑定在一起,也可以通过插件机制来实现自己的交换机
  • 多语言客户端:支持 Java, .NET, PHP, Python, JavaScript, Ruby, Go 等多种语言
  • 分布式部署:以集群方式部署,以获得高可用性和高吞吐量;在多个可用区和地区之间进行联合
  • 企业和云:可插入式认证、授权,支持 TLS 和 LDAP。轻量级,易于在公共和私有云中部署
  • 插件机制:多样化的工具和插件支持持续集成、运营指标以及与其他企业系统的集成。灵活的插件方法用于扩展 RabbitMQ 功能
  • 管理和监控:提供用于管理和监控 RabbitMQ 的 API 接口、命令行工具和 UI 界面

快速入门

  新建两个项目,一个producer-demo项目用于生产消息,一个项目consumer-demo用于消费消息,之后进行以下步骤:

  • ① 引入依赖
  • ② application.yml 配置
  • ③ 生产消息
  • ④ 消费消息
  • ⑤ 测试

① 引入依赖

  引入依赖:

1
2
3
4
5
<!-- AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② application.yml 配置

  application.yml文件配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /

③ 生产消息

  在producer-demo项目中创建生产者并启动应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class ProducerDemo {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 应用启动后,每 200ms 发送一次消息
*/
@PostConstruct
public void initSendMsg() {
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(200L);
String queueName = "simple.queue";
String message = "hello world " + i;
rabbitTemplate.convertAndSend(queueName, message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

}

注意默认情况下,simple.queue队列不会被 RabbitMQ 创建,所以需要提前去手动创建。

④ 消费消息

  在consumer-demo项目中创建消费者并启动应用:

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class ConsumerDemo {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
log.info("消费消息: {}", msg);
}

}

⑤ 测试

  若consumer-demo项目中出现以下日志,则代表消息生产发送成功及消费者消费消息成功:

1
2
消费消息: hello world 1
消费消息: hello world 2

基本概念

  RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

  从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型。

  RabbitMQ 的整体模型架构如下图所示:

RabbitMQ 模型架构

Producer

  Producer,即生产者, 生产者是创建消息和投递消息(发布到 RabbitMQ )的一方。
  生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者( Consumer )。

Consumer

  Consumer,即消费者, 接收消息的一方。

  消费者连接到 RabbitMQ 服务器, 并订阅到队列上
  当消费者消费一条消息时,只是消费消息的消息体( payload )。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体, 消费者不知道消息的生产者是谁, 当然,消费者也不需要知道。

Broker

  Broker,即消息中间件的服务节点 。

  对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点或者 RabbitMQ 服务实例。

Connection

  AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 连接通常是长连接。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。

Channel

  有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。

  AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。

  在涉及多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。

  一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

Queue

  Queue,即队列,是 RabbitMQ 内部用于存储消息的对象。

  RabbitMQ 中消息都只能存储在队列中,这一点和 Kafka 这种消息中间件相反。 Kafka 将消息存储在 topic (主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费 。

分摊消息

  多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊( Round-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。

不支持广播

  RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。

额外属性

  队列跟交换机共享某些属性,但是队列也有一些另外的属性。

  • Name
  • Durable(消息代理重启后,队列依旧存在)
  • Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
  • Auto-delete(当最后一个消费者退订后即被删除)
  • Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)

声明使用

  队列在声明(declare)后才能被使用:

  • 若一个队列尚不存在,声明一个队列时会创建它
  • 若声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响
  • 若声明中的属性与已存在队列的属性有差异,那么将抛出一个错误代码为 406 的通道级异常

Message

  消息存在于 RabbitMQ 的队列中,消息主要包括两个部分:

  • 消息体(payload):一般是一个带有业务逻辑结构的数据, 比如一个 JSON 字符串
  • 标签 ( Label ):用来表述这条消息,比如一个交换机的名称和一个路由键

Exchange

  Exchange,即交换机(简写X代表)。

  之前我们可能理解认为是生产者将消息投递到队列中的,但其实上这个想法在 RabbitMQ 中不会发生。实际上,是生产者首先将消息发送到交换机,之后由交换机将消息路由到一个或者多个队列中。如果路由不到,可能会返回给生产者一个错误,也可能直接丢弃消息。

Binding

  绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。

  如果要指示交换机E将消息路由给队列Q,那么Q就需要与E进行绑定。

  绑定操作需要定义一个可选的路由键(Routing Key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

  举个海南旅游飞机出行的例子:

  • 队列(Queue)是我们想要去的目的地——海南
  • 交换机(Exchange)是深圳宝安国际机场
  • 绑定(Binding)就是深圳宝安国际机场到目的地的路线,能够到达目的地的路线可以是一条或者多条

  拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。

  如果 AMQP 的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

RoutingKey

  RoutingKey,即路由键。

  生产者将消息发给交换机的时候, 一般会指定一个 Routing Key,用来指定这个消息的路由规则,而 Routing Key 需要与交换机类型和绑定键( Binding Key )联合使用才能最终生效。

  在交换机类型和绑定键( BindingKey )固定的情况下,生产者可以在发送消息给交换机时, 通过指定 RoutingKey 来决定消息流向哪里。

  生产者将消息发送给交换机时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换机的时候 , 这些绑定允许使用相同的 BindingKey。

  在某些情形下 , RoutingKey 与 BindingKey 可以看作同一个东西。

  在 direct 交换机类型下, RoutingKey 和 BindingKey 需要完全匹配才能使用,此时 RoutingKey 与 BindingKey 可以看作同一个东西。

  但是,在 topic 交换机类型下,RoutingKey 和 BindingKey 之间需要做模糊匹配, 两者并不是相同的。

  BindingKey 其实也属于路由键中的一种, 官方解释为:the routing key to use for the binding。 可以翻译为:在绑定的时候使用的路由键。大多数时候,为了避免混淆,包括官方文档和 RabbitMQ Java API 中都把 BindingKey 和 RoutingKey 看作 RoutingKey。

  可以这么理解:

  • 在使用绑定的时候, 其中需要的路由键是 BindingKey。涉及的客户端方法如: channel.exchangeBind、 channel .queueBind, 对应的 AMQP 命令(详情参见 2.2 节)为 Exchange.Bind、 Queue.Bind。
  • 在发送消息的时候, 其中需要的路由键是 RoutingKey 。涉及的客户端方法如 channel.basicPublish ,对应的 AMQP 命令为 Basic.Publish

  由于某些历史的原因, 包括现存能搜集到的资料显示:大多数情况下习惯性地将 BindingKey 写成 RoutingKey, 尤其是在使用 direct 类型的交换机的时候。

失效情况

  BindingKey 并不是在所有的情况下都生效 ,它依赖于交换机类型,比如 fanout 类型的交换机就会无视 BindingKey, 而是将消息路由到所有绑定到该交换机的队列中。

类型

  RabbitMQ 常用的交换机类型主要有以下四种 ,不同的类型有着不同的路由策略:

  • fanout
  • direct
  • topic
  • headers

  当然,AMQP 协议里还提到另外两种类型:System 和自定义,这里不予描述。

fanout

  对于 fanout (扇型)类型的交换机路由规则而言,其会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。

direct

  对于 direct 类型的交换机路由规则而言,其只会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。

Direct Exchange

  如上图的 direct 交换机:

  • 如果我们发送一条消息, 并在发送消息的候设置路由键为error, 则消息会路由到 Queue 1 和 Queue 2
  • 如果在发送消息的时候设置路由键为warninfo,消息只会路由到 Queue 2,
  • 如果以其他的路由键发送消息, 则消息不会路由发送到 Queue 1 和 Queue 2 两个队列中

topic

  direct 类型的交换机的路由规则需要完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。

  因此,topic (主题)类型的交换机在匹配规则上进行了扩展, 虽然它与 direct 类型的交换机相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但是,这里的匹配规则有些许不同,其约定 :

  • RoutingKey 为一个点号分隔的字符串,如com.rabbitmq.client
  • BindingKey 和 RoutingKey 一样也是点号分隔的字符串
  • BindingKey 中可存在两种特殊字符串用于做模糊匹配,其中用于匹配一个单词,#用于匹配多规格单词(可以是零个)

Topic Exchange

  如上图的 topic 交换机:

  • 路由键为prod.orderprod.pay的消息会路由到 Queue 1 中
  • 路由键为test.orderdev.order的消息会路由到 Queue 2 中
  • 路由键为mock.order的消息会路由到 Queue 2 中

headers

  headers 类型的交换机不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

  在绑定队列和交换机时制定一组键值对,当发送消息到交换机时, RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换机绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

  由于 headers 类型的交换机性能会很差,而且也不实用,所以实际场景下基本上不会看到它的存在。

Virtual Host

  Virtual Host,即虚拟主机,是对 Queue、Exchange 等资源的逻辑分组。

  Rabbit MQ 为什么引入虚拟主机这个概念呢?
  主要是为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列等),这跟 Web Server(如 Nginx) 中虚拟主机概念非常相似。因此,当连接被建立的时候,AMQP 客户端可以指定使用哪个虚拟主机。

AMQP 协议

  RabbitMQ 遵从 AMQP 协议,换句话说, RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议)。

  目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

  RabbitMQ 中的交换机、交换机类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。

  AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换机,交换机和队列绑定。当生产者发送消息时所携带的 RoutingKey 与绑定时的 BindingKey 相匹配时,消息即被存入相应的队列之中,消费者则从订阅相应的队列来获取消息。

层次

  AMQP 协议本身包括三层:

  • Module Layer: 位于协议最高层, 主要定义了一些供客户端调用的命令, 客户端可以利 用这些命令实现自己的业务逻辑。 例如 ,客户端可以使用 Queue . Declare 命令声明一个队列或者使用 Basic . Consume 订阅消费一个队列中的消息
  • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应 答返回给客户端, 主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理
  • Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等

  AMQP 本质就是一个通信协议,而通信协议都会涉及报文交互。从 low“level 举例来说,AMQP 本身是应用层的协议,其填充于 TCP 协议层的数据部分。而从 high-level 来说,AMQP 是通过协议命令进行交互的。AMQP 协议可以看作一系列结构化命令的集合, 这里的命令代表一种操作,类似于 HTTP 中的方法( GET、 POST、 PUT、 DELETE 等)。

工作流程

  无论是何种消息队列,其工作流程都是:生产者生产消息到一个存储者中,之后消费者从存储者中获取消息进行消费。

  对 RabbitMQ 而言,同样如此(多了一个特殊的交换机中间层),因此,RabbitMQ 的工作流程是怎样的,只需要分析其消息的生产和消费流程即可。

生产者

  生产者发送消息的时候:

  • ① 生产者连接到 RabbitMQ Broker,建立一个连接(Connection ),开启一个信道(Channel)
  • ② 生产者声明一个交换机,并设置相关属性(如交换机类型、是否持久化等)
  • ③ 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  • ④ 生产者通过路由键将交换机和队列绑定起来
  • ⑤ 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换机等信息
  • ⑥ 相应的交换机根据接收到的路由键查找相匹配的队列
  • ⑦ 如果找到,则将从生产者发送过来的消息存入相应的队列中
  • ⑧ 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • ⑨ 关闭信道
  • ⑩ 关闭连接

消费者

  消费者接收消息 的过程:

  • ① 消费者连接到 RabbitMQ Broker, 建立一个连接( Connection ), 开启一个信道( Channel)
  • ② 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
  • ③ 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息
  • ④ 消费者确认( ack )接收到的消息
  • ⑤ RabbitMQ 从队列中删除相应己经被确认的消息
  • ⑥ 关闭信道
  • ⑦ 关闭连接

工作原理

  待研究。

高级

不可达消息

  若消息在传递过程中未到达目的地,消息之后会被怎样对待?

  RabbitMQ 给出了三个答案:

  • 丢弃
  • 返回给生产者
  • 存储起来

丢弃

  消息是否丢弃不易单独说明,具体见下文。

返回给生产者

  mandatoryimmediate是发送消息时调用的channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。

mandatory

  对于mandatory参数的设置而言:

  • 当设为true时,若交换机无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用Basic.Return命令将消息返回给生产者
  • 当设置为false时,出现上述情形,则消息直接被丢弃,注意此值为默认值

  若mandatory参数设置为true,生产者如何获取到没有被正确路由到合适队列的消息呢?

  这可以通过调用channel.addReturnListener来添加ReturnListener监听器来实现。

immediate(已过时)

  对于immediate参数的设置而言:

  • 当设为true时,如果交换机在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。 当与路由键匹配的所有队列都没有消费者时, 该消息会通过Basic.Return命令将消息返回给生产者。

  RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持, 对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能, 增加了代码复杂性, 建议采用 TTL 和 DLX 的方法替代。

mandatory VS immediate

  概括来说:

  • mandatory参数告诉服务器,至少将该消息路由到一个队列中,否则将消息返回给生产者
  • immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了

存储

  RabbitMQ 提供的备份交换机(Alternate Exchange)则可以将未能被交换机路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

Alternate Exchange

  Alternate Exchange,即备份交换机,简称 AE。

  生产者在发送消息的时候:

  • 如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失
  • 如果设置了mandatory参数,那么需要添加额外的编程逻辑,生产者的代码将变得复杂

  如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换机,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息 。

创建方式

  如何声明一个备份交换机?

  可以通过在声明交换机调用channel.exchangeDeclare的时候添加 alternate-exchange参数来实现,也可以通过策略的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置。

  备份交换机其实和普通的交换机没有太大的区别, 为了方便使用, 建议设置为 fanout 类型,如若读者想设置为 direct 或者 topic 的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换机时的路由键和从生产者发出的路由键是一样的。

  考虑这样一种情况, 如果备份交换机的类型是 direct,并且有一个与其绑定的队列,假设绑定的路由键是 key1,当某条携带路由键为 key2 的消息被转发到这个备份交换机的时候, 备份交换机没有匹配到合适的队列 ,则消息丢失。如果消息携带的路由键为 key1,则可以存储到队列中。

兼容性

  对于备份交换机, 总结了以下几种特殊情况:

  • 若备份交换机和mandatory参数一起使用,那么mandatory参数无效
  • 若设置的备份交换机不存在, 客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失
  • 若备份交换机没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失
  • 若备份交换机没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失

持久化机制

  持久化机制可以提高 RabbitMQ 的可靠性, 以防在异常情况(重启 、关闭、宿机等)下的数据丢失。

类型

  RabbitMQ 的持久化分为三个部分:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化

  交换机的持久化是通过在声明队列是将durable参数置为true实现的。

  如果交换机不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换机中了。

  对一个长期使用的交换机来说,建议将其置为持久化的 。

队列持久化

  队列的持久化是通过在声明队列时将durable参数置为true实现的。

  如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

消息持久化

  队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部 所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。

  消息的持久化可通过将消息的投递模式 (BasicProperties中的deliveryMode属性)设置为 2 实现。

如何设置持久化?

  设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。
  单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。因此,单单设置消息持久化而不设置队列的持久化将毫无意义。

  虽然可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能(随机)。因为写入磁盘的速度比写入内存的速度慢得不只一点点。

  因此,对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。当然,在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。

持久化设置后消息就不会丢失了嘛?

  那么,将交换机、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?

  答案是否定的。

  首先从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将 autoAck 参数设置为 false,并进行手动确认即可。

  其次,在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视〉才能存入磁盘之中。 RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的 fsync1方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

  这个问题应该如何解决呢?

  这里可以引入 RabbitMQ 的镜像队列机制,相当于配置了副本,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点( slave ),这样有效地保证了高可用性, 除非整个集群都挂掉。虽然这样也不能完全保证 RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,因此,在实际生产环境中的关键业务队列一般都会设置镜像队列。

  当然,还可以在发送端引入事务机制或者发送方确认机制来保证消息己经正确地发送并存储至 RabbitMQ 中,前提还要保证在调用channel.basicPublish方法的时候交换机能够将消息正确路由到相应的队列之中。

确认的消息

生产者确认消息成功发送

  虽然在 RabbitMQ 中可通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,但我们还会遇到一个问题,当消息的生产者将消息发送出去之后,怎么知道消息到底有没有正确地到达服务器呢?

  如果不进行特殊配置, 默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。

  如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

  针对这个问题,RabbitMQ 提供了两种解决方式:

  • ① 事务机制
  • ② 发送方确认( publisher confirm)机制

事务机制

  RabbitMQ 客户端中与事务机制相关的方法有三个:

  • channel.txSelect:用于将当前的信道设置成事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

  在通过channel.txSelect方法开启事务之后,我们便可以发布消息给 RabbitMQ 了:

  • 如果事务提交成功,则消息一定到达了 RabbitMQ 中
  • 如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚
缺点

  事务虽然能够解决消息发送方和 RabbitMQ 之间消息确认的问题,但是使用它会降低 RabbitMQ 的性能,为什么呢?原因在于事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。

  那么有没有更好的方法,既能保证消息发送方能够确认消息已经正确送达, 又能不带来性能上的损失呢?

  从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了一个改进方案, 即发送方确认机制。

发送方确认机制

  采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量, 因此 RabbitMQ 引入了一种轻量级的方式一发送方确认( publisher confirm )机制 。

  生产者将信道设置成 confirm (确认)模式, 一旦信道进入 confirm 模式, 所有在该信道上面发布的消息都会被指派一个唯一的 ID (从 1 开始), 一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID ),这就使得生产者知晓消息已经正确到达了目的地了。

  如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。

  RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel . basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理,

  事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的。

  一旦发布一条消息, 生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之 后,生产者应用程序便可以通过回调方法来处理该确认消息。

  那么,如果出错了呢?
  如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。

  生产者通过调用 channel . confirmSelect 方法(即 Confirm.Select 命令)将信道 设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息既被 ack 又被 nack 的情况 , 并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证。

事务机制 VS 发送方确认机制

  • 事务机制和发送方确认机制两者是互斥的, 不能共存。 因此,如果企图将已开启事务模式 的信道再设置为 publisher confmn 模式, RabbitMQ 会报错,或者如果企图将已开启 publisher confirm 模式的信道再设置为事务模式, RabbitMQ 也会报错
  • 即使使用了事务机制或发送方确认机制,消息也可能丢失。事务和发送方确认机制确保的是消息能够正确地发送至 RabbitMQ ,这里的发送至 RabbitMQ的含义是指消息被正确地发往至 RabbitMQ 的交换机, 如果此交换机没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换机能够有匹配的队列。更进一步地讲, 发送方要配合 mandatory 参数或者备份交换机一起使用来提高 消息传输的可靠性

  发送方确认机制的优势在于并不一定需要同步确认。 这里我们改进了 一下使用方式, 有如下两种:

  • 批量 confirm 方法:每发送一批消息后, 待服务器的确认返回 。调用 channel.waitForConfirm s 方法, 等待服务器的确认返回 。
  • 异步 confirm 方法:提供一个回调方法, 服务端确认了 一条或者多条消息后客户端会回调这个方法进行处理。

  在批量 confirm 方法中, 客户端程序需要定期或者定量( 达到多少条), 亦或者两者结合起 来调用 channel .waitForConfirms 来等待 RabbitMQ 的确认返回。 相比于前面示例中的普 通 confmn 方法, 批量极大地提升了 confmn 的效率, 但是问题在于出现返回 Basic.Nack 或 者超时情况时, 客户端需要将这一批次的消息 全部重发, 这会带来明显的重复消息数量 ,并且 当消息经常丢失时, 批量 confirm 的性能应该是不升反降的。

  异步 confirm 方法的编程实现最为复杂 。 在客户端 Channel 接口中提供的 addConfirmListener 方法可以添加 ConfirmListener 这个回调接口, 这个 Conf irmListener 接口包含两个方法: handleAck 和 handleNack ,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic .Nack 。在这两个方法中都包含有 一 个参数 deli veryTag (在 publisher confirm 模式下用来标记消息的唯一有 序序号) 。 我们需要为每一个信道维护一个“ unconfirm ”的消息序号集合, 每发送一条消息, 集合中的元素加 1 。 每当调 用 ConfirmListener 中的 handleAck 方法时, “ uncon曲m ”集合中删掉相应 的一条 (multiple 设置为 false )或者多条( multiple 设置为 true )记录。 从程序运行效率上来看 , 这个“unconfrrm”集 合最好采用有序集合 SortedSet 的存储结构。 事实上, Java 客户端 SDK 中的 waitForConfirms 方法也是通过 SortedSet 维护消息序号的。 代码清单 4-19 为我们 演示了异步 confirm 的编码实现, 其中的 confirmSet 就是一个 SortedSet 类型的集合。

消费者确认消费消息

  消费者如何正确地消费消息?
  消费者客户端可以通过推模式或者拉模式的方式来获取并消费消息,当消费者处理完业务逻辑需要手动确认消息己被接收,这样 RabbitMQ 才能把当前消息从队列中标记清除。

  当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过channel.basicNack或者channel.basicReject来拒绝掉。

  这里对于RabbitMQ 消费端来说, 还有几点需要注意:

  • 消息分发;
  • 消息顺序性;
  • 弃用 QueueingConsumer

消息分发

  当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。

  每条消息只会发送给订阅列表里的一个消费者。 这种方式非常适合扩 展,而且它是专门为并发程序设计的。 如果现在负载加重,那只需要创建更多的消费者来消费处理消息即可。

  当然,很多时候轮询的分发机制也不是那么优雅 。

  默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给第m%n(取余的方式)个消费者,RabbitMQ 不管消费者是否消费并己经确认( Basic.Ack )了消息。

  试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。

  那么该如何处理这种情况呢?

  这里就要用 到 channel.basicQo s(int prefetchCount) 这个方法, 如前面章节所述, channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。

  举例说明,在订阅消费队列之前,消费端程序调用了channel.basicQos()之后订阅了某个队列进行消费 。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者 再发送任何消息。 直到消费者确认了某条消息之后,RabbitMQ 将相应的计数减 1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的“滑动窗口”。

消息顺序性

  消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。

  举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为 msg1、 msg2、 msg3,那么消费者必然也是按照 msg1、 msg2 、 msg3 的顺序进行消费的。

  目前很多资料显示 RabbitMQ 的消息能够保障顺序性, 这是不正确的, 或者说这个观点有很大的局限性。

有序的消息

  满足以下条件时,可以保证消息的顺序性:

  • 只有一个生产者
  • 只有一个消费者
  • 不使用任何 RabbitMQ 的高级特性
  • 没有消息丢失、网络故障之 类异常的 情况发生
无序的消息

  那么哪些情况下 RabbitMQ 的消息顺序性会被打破呢?

  下面介绍几种常见的情形。

生产者启用的事务机制异常

  如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚, 那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。

生产者启用的发送确认机制异常

  同样,如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ 的 Basic.Nack 命令时,那么同样需要补偿发送, 结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发送时候开始的。

消息存在过期

  考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,井且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。

消息存在优先级

  再考虑一种情形,如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。

消息拒绝重复消费

  如果一个队列按 照前后顺序分有 msg1、msg2、msg3、msg4 这 4 个消息,同时有 Consumer A 和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,Consumer A 中的消息为 msg1 和 msg3, ConsumerB 中的消息为 msg2、msg4。ConsumerA 收到消息 msg1 之后并不想处理而调用了 Basic.Nack/.Reject将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msg1 之后被发送到了 ConsumerB 中,此时 ConsumerB 已经消费了 msg2、msg4,之后再消费 msg1,这样消息顺序性 也就错乱了。或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 已经消费了 msg3,那么再消费 msg1,消息顺序性也无法得到保障。同样可以用在 Basic.Recover 这个 AMQP 命令中 。

多个生产者生产消息

  如果有多个生产者同时发送消息, 无法确定消息到达 Broker 的前后顺序, 也就无法验证消息的顺序性。

如何保证消息顺序性

  如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 Sequence ID )来实现。

弃用 Queueing Consumer

  建议不要使 用这个Queueing Consumer类来实现 订阅消费 。

  乍一看也没什么问题, 而且实际生产环境中如果不是太“傲娇”地使用也不会造成 什么大 问题 。 QueueingConsumer 本身有几个大缺陷,需要读者在使用时特别注意。首当其冲的就 是内存溢出的问题, 如果由于某些原因, 队列之中堆积了比较多的消息, 就可能导致消费者客 户端内存溢出假死,于是发生恶性循环, 队列消息不断堆积而得不到消化 。

TTL

  TTL,Time to Live 的简称,即过期时间。

  RabbitMQ 可以对消息和队列设置 TTL,消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(这点不是绝对的)。

作用范围

  RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue) 设置过期时间:

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms,会对整个队列消息统一过期
  • 设置消息过期时间使用参数:expiration,单位:ms,当该消息在队列头部时(消费时),会单独判断这一消息是否过期

  如果两者都进行了设置,则以时间短的为准。

自动清除

  当消息到达存活时间后,若还没有被消费,会被自动清除。

死信队列

  DLX,全称为 Dead-Letter-Exchange(死信交换机、死信邮箱)。
  当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。

  DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时 , RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消 息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 immediate 参数的功能。

消息如何变成死信?

  消息变成死信一般是由于以下几种情况:

  • 消息过期
  • 队列达到最大长度
  • 消息被拒绝(basicNack/basicReject),井且不把消息重新放入原目标队列(即设置 requeue 参数为 false)

队列如何绑定 DLX?

  可以给队列设置参数:

  • x-dead-letter-exchange:DLX 名称
  • x-dead-letter-routing-key:DLX 和死信队列绑定 的 Routing Key

延迟队列

  延迟队列存储的对象是对应的延迟消息, 所谓“延迟消息”是指当消息被发送以后, 并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费 。

  延迟队列的使用场景有很多,比如:

  • 在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备

  在 AMQP 协议中, 或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。

优先级队列

  优先级队列, 顾名思义, 具有高优先级的队列具有高的优先权, 优先级高的消息具备优先被消费的特权。

  可以通过设置队列的x-max-priority参数来实现。

自动删除

  交换机可以通过将autoDelete属性设置为true启用自动删除功能。

  交换机自动删除的前提是至少有一个队列或者交换机与这个交换机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑。

注意不能错误地把此参数理解为:当与此交换机连接的客户端都断开时,这个交换机自动删除。

  队列可以通过将autoDelete属性设置为true启用自动删除。
  队列自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。

注意不能错误地把此参数理解为:当连接到此队列的所有客户端断开时,这个队列自动删除。

  这两个参数默认均为 false,也就是说交换机和队列默认情况下都未启用自动删除功能,一般情况下我们也不需要要设置。

客户端开发

  详见工程项目代码。

集群

  待研究。

管理

多租户与权限

  每一个 RabbitMQ 服务器都能创建虚拟的消息服务器,我们称之为虚拟主机( virtual host),简称为 vhost。 每一个 vhost 本质上是一个独立的小型 RabbitMQ 服务器, 拥有自己独立的队列、 交换机及绑定关系等, 井且它拥有自己独立的权限。 vhost 就像是虚拟机与物理服务器一样, 它 们在各个实例间提供逻辑上的分离, 为不同程序安全保密地运行数据, 它既能将同 一个 RabbitMQ 中的众多客户区分开, 又可以避免队列和交换机等命名冲突。 vhost 之间是绝对隔离的,无法将 vhostl 中的交换机与vhost2 中的队列进行绑定, 这样既保证了安全性, 又可以确保可移植性。如果在使用 RabbitMQ 达到一定规模的时候, 建议用户对业务功能、场景进行归类区分, 并为之分配独立的 vhost.

  vhost 是 AMQP 概念的基础, 客户端在连接的时候必须制定一个 vhost。 RabbitMQ 默认创 建的 vhost 为“/”, 如果不需要多个 vhost 或者对 vhost 的概念不是很理解, 那么用这个默认的 vhost 也是非常合理的, 使用默认的用户名 guest 和密码 guest 就可以访问它。 但是为了安全和 方便, 建议重新建立一个新的用户来访问它。

用户管理

  在 RabbitMQ 中 ,用户是访 问 控制( Access Control) 的基本单元,且单个用户可以跨越多 个 vhost 进行授权。 针对一至多个 vhost,用户可以被赋予不同级别的访问权限,并使用标准的 用户名和密码来认证用户。

应用与集群管理

应用管理

集群管理

HTTP API 接口管理

  RabbitMQ Management 插件不仅提供了 Web 管理界面, 还提供了 HTTP API 接口来方便调 用 。 比如创建一个队列, 就可以通过 PUT 方法调用/ api/queues/vhost/name 接口来实现 。

日志

  RabbitMQ 默认日志存放路径: `/var/log/rabbitmq/rabbit@xxx.log`

监控

消息追踪

  在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。

  对于 RabbitMQ 而言,可能是因为生产者或消费者与 RabbitMQ 断开了连接,而它们与 RabbitMQ 又采用了不同的确认机制;也有可能是因为交换机与队列之间不同的转发策略;甚至是交换机并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

  在RabbitMQ中可以使用Firehoserabbitmq_tracing插件功能来实现消息追踪。

  注意:打开 trace会影响消息写入功能,适当打开后请关闭。

  • rabbitmqctl trace_on:开启 Firehose 命令
  • rabbitmqctI trace_off:关闭 Firehose 命令

疑问清单

队列应该何时创建

  按照 RabbitMQ 官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。
  这是一个很好的建议,但不适用于所有的情况。

  如果业务本身在架构设计之初己经充分地预估了队列的使用情况, 完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、 RabbitMQ 命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。

  预先创建好资源还有一个好处是:可以确保交换机和队列之间正确地绑定匹配。 很多时候,由于人为因素、代码缺陷等,发送消息的交换机并没有绑定任何队列, 那么消息将会丢失;或者交换机绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。 当然可以配合 mandatory 参数或者备份交换机来提高程序的健壮性。

  与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。

  如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,程序的灵活性,也完全可以在业务程序中声明队列。

  业务至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。

参考

  • RabbitMQ 官网
  • 朱忠华. RabbitMQ 实战指南 [M].电子工业出版社,2017

文章信息

时间 说明
2023-01-01 初稿
0%